package com.shujia.spark.streaming

import java.util

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import redis.clients.jedis.Jedis

import scala.collection.mutable

object Demo7Direct {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("streaming")
      .setMaster("local[4]")


    /**
      * 创建streaming 上下文对象，指定batch的间隔时间,多久计算一次
      *
      */
    val ssc = new StreamingContext(conf, Durations.seconds(5))


    /**
      * earliest
      * 当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，从头开始消费
      * latest
      * 当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，消费新产生的该分区下的数据
      * none
      * topic各分区都存在已提交的offset时，从offset后开始消费；只要有一个分区不存在已提交的offset，则抛出异常
      *
      */

    val groupId = "asdasda"

    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "master:9092,node1:9092,node2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> "false"
    )

    val topics = Array("test_topic2")


    val key: String = groupId + ":" + "test_topic2"

    /**
      * 读取redis中的消费偏移量
      *
      */

    val redis = new Jedis("master", 6379)

    val map: util.Map[String, String] = redis.hgetAll(key)

    import scala.collection.JavaConverters._

    val scalaMap: mutable.Map[String, String] = map.asScala

    //消费的偏移量，topic 分区  偏移量
    val partitionOffset: Map[TopicPartition, Long] = scalaMap.map(kv => {
      val partition: String = kv._1
      val offset: String = kv._2

      val tp = new TopicPartition("test_topic2", partition.toInt)

      (tp, offset.toLong)
    }).toMap

    println(partitionOffset)


    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams, partitionOffset) //读取数据的时候指定消费偏移量
    )


    stream.foreachRDD(rdd => {


      //编写用户自定义的代码逻辑
      rdd.map(_.value()).foreach(println)

      //获取消费偏移量
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges


      /**
        * 可以将偏移量保存到redis中
        * key : 消费者组 + topi + partition
        * value : untilOffset 作为value
        *
        * 启动redis
        * ./redis-server redis.conf
        */


      //1、创建redis链接
      val jedis = new Jedis("master", 6379)

      for (offsetRange <- offsetRanges) {
        val fromOffset: Long = offsetRange.fromOffset

        val partition: Int = offsetRange.partition
        val topic: String = offsetRange.topic
        val untilOffset: Long = offsetRange.untilOffset

        println(topic + "\t" + partition + "\t" + fromOffset + "\t" + untilOffset)



        //保存消费偏移量到redis中
        jedis.hset(key, partition.toString, untilOffset.toString)

      }

    })


    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}
